Lab 3 - Online Purchase Recommendations

Learn how to create a recommendation engine using the Alternating Least Squares algorithm in Spark's machine learning library

The data

This is a transnational data set which contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered non-store online retail. The company mainly sells unique all-occasion gifts. Many customers of the company are wholesalers.

http://archive.ics.uci.edu/ml/datasets/Online+Retail

Create an RDD from the csv data


In [2]:
#Put the csv into an RDD (at first, each row in the RDD is a string which
#correlates to a line in the csv
retailData = sc.textFile("OnlineRetail.csv")
print retailData.take(2)


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-b3927aa57866> in <module>()
      2 #correlates to a line in the csv
      3 retailData = sc.textFile("OnlineRetail.csv")
----> 4 print retailData.take(2)

/root/spark-1.6.0-bin-fluxcapacitor/python/pyspark/rdd.pyc in take(self, num)
   1265         """
   1266         items = []
-> 1267         totalParts = self.getNumPartitions()
   1268         partsScanned = 0
   1269 

/root/spark-1.6.0-bin-fluxcapacitor/python/pyspark/rdd.pyc in getNumPartitions(self)
    354         2
    355         """
--> 356         return self._jrdd.partitions().size()
    357 
    358     def filter(self, f):

/root/spark-1.6.0-bin-fluxcapacitor/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/root/spark-1.6.0-bin-fluxcapacitor/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/root/spark-1.6.0-bin-fluxcapacitor/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling o33.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/root/pipeline/myapps/jupyter/PySpark/Intro/Lab 3 - Machine Learning/OnlineRetail.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)

Prepare and shape the data


In [ ]:
from pyspark.mllib.recommendation import ALS, Rating
import re

#Remove the header from the RDD
header = retailData.first()
retailData = retailData.filter(lambda line: line != header)

#To produce the ALS model, we need to train it with each individual
#purchase.  Each record in the RDD must be the customer id, 
#item id, and the rating.  In this case, the rating is the quantity
#ordered.  MLlib converts these into a sparce, unfactored matrix.
retailData = retailData.map(lambda l: l.split(",")).\
    filter(lambda l: int(l[3]) > 0 and len(re.sub("\D", "", l[1])) != 0 and len(l[6]) != 0).\
    map(lambda l: (int(l[6]),int(re.sub("\D", "", l[1])),int(l[3])))

#Randomly split the data into a testing set and a training set
testRDD, trainRDD = retailData.randomSplit([.2,.8])

trainData = trainRDD.map(lambda l:  Rating(l[0],l[1],l[2]))

print trainData.take(2)
print
print testRDD.take(2)

Build the recommendation model


In [ ]:
#Use trainging RDD to train a model with Alternating Least Squares 
#rank=5
#5 columns in the user-feature and product-feature matricies
#iterations=10
#10 factorization runs
rank = 5
numIterations = 10
model = ALS.train(trainData, rank, numIterations)

print "The model has been trained"

Test the model


In [ ]:
#Evaluate the model with the test rdd by using the predictAll function
predict = model.predictAll(testRDD.map(lambda l: (l[0],l[1])))

#Calculate and print the Mean Squared Error
predictions = predict.map(lambda l: ((l[0],l[1]), l[2]))
ratingsAndPredictions = testRDD.map(lambda l: ((l[0], l[1]), l[2])).join(predictions)

ratingsAndPredictions.cache()
print ratingsAndPredictions.take(3)

meanSquaredError = ratingsAndPredictions.map(lambda l: (l[1][0] - l[1][1])**2).mean()
print
print 'Mean squared error = %.4f' % meanSquaredError
This doesn't give us that good of a representation of ranking becuase the ranks are number of purchases. Something better may be to look at some actual recommendations.

In [ ]:
recs = model.recommendProducts(15544,5)
for rec in recs:
    print rec

This user seems to have purchased a lot of childrens gifts and some holiday items. The recomendation engine we created suggested some aitems along these lines

In [ ]:
#Rating(user=15544, product=84568, rating=193.03195106065823)
#GIRLS ALPHABET IRON ON PATCHES 

#Rating(user=15544, product=16033, rating=179.45915040198466)
#MINI HIGHLIGHTER PENS

#Rating(user=15544, product=22266, rating=161.04293255928698)
#EASTER DECORATION HANGING BUNNY

#Rating(user=15544, product=84598, rating=141.00162368678377)
#BOYS ALPHABET IRON ON PATCHES

#Rating(user=15544, product=72803, rating=129.54033486738518)
#ROSE SCENT CANDLE JEWELLED DRAWER
Data Citation

Daqing Chen, Sai Liang Sain, and Kun Guo, Data mining for the online retail industry: A case study of RFM model-based customer segmentation using data mining, Journal of Database Marketing and Customer Strategy Management, Vol. 19, No. 3, pp. 197–208, 2012 (Published online before print: 27 August 2012. doi: 10.1057/dbm.2012.17).